跳到主要内容

Zookeeper 整合进 Java

配置 Java 环境

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.2</version>
</dependency>

编写启动类测试

public class ZookeeperConnection {
public static void main(String[] args) {

CountDownLatch countDownLatch = new CountDownLatch(1); // 计数器对象

/*
第一个参数:服务器的ip和端口
第二个参数:客户端与服务器之间的会话超时时间,以毫秒为单位
第三个参数:监视器对象
*/
try (ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, (event) -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功");
countDownLatch.countDown();
}
})) {
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
// 打印会话编号
System.out.println(zooKeeper.getSessionId());
} catch (Exception e) {
e.printStackTrace();
}
}
}

注意:因为 zookeeper 这个包依赖 log4j,如果没有添加配置会抛出如下警告(可以无视)

log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

可以通过引入这个日志框架来消除这个错误

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.14.0</version>
</dependency>

然后添加一个 log4j.properties 文件丢到 resources 根目录里面去,里面编写如下配置

log4j.rootLogger=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

如果项目已经有日志框架了,可以直接排除掉这个 log4j 依赖

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.2</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

编写单元测试

引入依赖

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.7.0</version>
<scope>test</scope>
</dependency>

注意 @BeforeAll 必须是静态方法

class ZookeeperConnectionTest {

static ZooKeeper zooKeeper = null;

// 先连接上
@BeforeAll
static void connect() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, (event) -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功");
countDownLatch.countDown();
}
});
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
// 打印会话编号
System.out.println(zooKeeper.getSessionId());
}

/**
* 同步创建节点
*/
@Test
void create1() throws KeeperException, InterruptedException {
/*
第一个参数:节点的路径
第二个参数:节点的数据
第三个参数:权限列表 ZooDefs.Ids.OPEN_ACL_UNSAFE:world:anyone:cdrwa /
第四个参数:节点的类子持久化节点
*/
zooKeeper.create("/hello", "hello_world".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
}

/**
* 关闭连接
*/
@AfterAll
static void after() throws InterruptedException {
if (zooKeeper != null) {
System.out.println("关闭成功");
zooKeeper.close();
}
}

}

连接 zookeeper 集群

public class ZkCluster {
public static void main(String[] args) {
try {
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:服务器的ip和端口
// arg2:客户端与服务器之间的会话超时时间 以毫秒为单位的
// arg3:监视器对象
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181, 127.0.0.1:2182, 127.0.0.1:2183", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
// 会话编号
System.out.println(zooKeeper.getSessionId());
zooKeeper.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}